package io.feige.rpc.producer.network.implement;

import io.feige.rpc.producer.RpcProducerService;
import io.feige.rpc.producer.config.ProducerConfig;
import io.feige.rpc.producer.network.ProducerConnectionService;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerConnectionServiceImpl implements ProducerConnectionService {

	private static Logger logger=LoggerFactory.getLogger(ProducerConnectionServiceImpl.class);
	
	private ServerBootstrap bootstrap;
	
	private RpcProducerService rpcProducerService;
	
	@Override
	public void start(RpcProducerService rpcProducerService, ProducerConfig config) {
		this.rpcProducerService=rpcProducerService;
		bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
		bootstrap.setPipelineFactory(new ServerPipelineFactory());
		if (config.getHost()!=null) {
			bootstrap.bind(new InetSocketAddress(config.getHost(), config.getPort()));
		}else{
			bootstrap.bind(new InetSocketAddress(config.getPort()));
		}
		logger.info("Producer service start on port: {}", config.getPort());
	}

	@Override
	public void stop() {
		bootstrap.releaseExternalResources();
	}
	
	private class ServerPipelineFactory implements ChannelPipelineFactory {
		
		public ChannelPipeline getPipeline() throws Exception { 
			ChannelPipeline pipeline = Channels.pipeline(); 
			pipeline.addLast("decoder", (SimpleChannelUpstreamHandler)rpcProducerService.getProtocol().getDecoder()); 
			pipeline.addLast("encoder", (ChannelDownstreamHandler)rpcProducerService.getProtocol().getEncoder());  
			pipeline.addLast("handler", (SimpleChannelHandler)rpcProducerService.getProtocol().getServiceInvokerHandler(rpcProducerService)); 
			return pipeline;
		} 
	}

}
